Skip to content

Wizcli improvements #12446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: bugfix
Choose a base branch
from

Conversation

OsamaMahmood
Copy link
Contributor

Description

  1. Core Functionality:
  • Implemented a robust parsing system for different types of Wiz CLI scan results:
    • Library vulnerabilities
    • Secrets detection
    • OS package vulnerabilities
    • Infrastructure as Code (IaC) rule matches
  1. Deduplication Improvements:
  • Enhanced the _generate_unique_id method to ensure consistent finding deduplication using unique_id_from_tool:
    • Now uses sorted components for stable hash generation
    • Properly handles None values and whitespace
    • Fixed docstring formatting and code style issues
    • Components are now consistently ordered regardless of input order

Checklist

This checklist is for your information.

  • Make sure to rebase your PR against the very latest dev.
  • Features/Changes should be submitted against the dev.
  • Bugfixes should be submitted against the bugfix branch.
  • Give a meaningful name to your PR, as it may end up being used in the release notes.
  • Your code is flake8 compliant.
  • Your code is python 3.11 compliant.
  • If this is a new feature and not a bug fix, you've included the proper documentation in the docs at https://github.com/DefectDojo/django-DefectDojo/tree/dev/docs as part of this PR.
  • Model changes must include the necessary migrations in the dojo/db_migrations folder.
  • Add applicable tests to the unit tests.
  • Add the proper label to categorize your PR.

@github-actions github-actions bot added settings_changes Needs changes to settings.py based on changes in settings.dist.py included in this PR parser labels May 14, 2025
Copy link

dryrunsecurity bot commented May 14, 2025

DryRun Security

This pull request contains multiple security vulnerabilities, including potential information disclosure in error logging, a possible denial of service risk through resource exhaustion, and a hardcoded service account key in test data, which could expose sensitive credentials and system information if not properly addressed.

⚠️ Potential Information Disclosure in Error Logging in dojo/tools/wizcli_dir/parser.py
Vulnerability Potential Information Disclosure in Error Logging
Description Error messages in the Wizcli directory parser include exception details. If these logs or exceptions are not carefully handled, they could expose internal system information. Implementing more generic error messages and ensuring proper exception handling is recommended to mitigate potential information disclosure.

import json
import logging
from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers
logger = logging.getLogger(__name__)
class WizcliDirParser:
"""Wiz CLI Directory/IaC Scan results in JSON file format."""
def get_scan_types(self):
return ["Wizcli Dir Scan"]
def get_label_for_scan_types(self, scan_type):
return "Wiz CLI Scan (Directory)"
def get_description_for_scan_types(self, scan_type):
return "Parses Wiz CLI Directory/IaC scan results in JSON format, creating granular findings for vulnerabilities and secrets."
def get_findings(self, file, test):
"""Processes the JSON report and returns a list of DefectDojo Finding objects."""
try:
scan_data = file.read()
if isinstance(scan_data, bytes):
# Try decoding common encodings
try:
scan_data = scan_data.decode("utf-8-sig") # Handles BOM
except UnicodeDecodeError:
scan_data = scan_data.decode("utf-8") # Fallback
data = json.loads(scan_data)
except json.JSONDecodeError as e:
msg = f"Invalid JSON format: {e}"
logger.error(msg)
raise ValueError(msg) from e
except Exception as e:
msg = f"Error processing report file: {e}"
logger.error(msg)
raise ValueError(msg) from e
findings = []
results_data = data.get("result", {})
if not results_data:
logger.warning("No 'result' key found in the Wiz report. Unable to parse findings.")
return findings
# Parse Libraries (Vulnerabilities)
libraries = results_data.get("libraries")
if libraries:
logger.debug(f"Parsing {len(libraries)} library entries.")
findings.extend(WizcliParsers.parse_libraries(libraries, test))
else:
logger.debug("No 'libraries' data found in results.")
# Parse Secrets
secrets = results_data.get("secrets")
if secrets:
logger.debug(f"Parsing {len(secrets)} secret entries.")
findings.extend(WizcliParsers.parse_secrets(secrets, test))
else:
logger.debug("No 'secrets' data found in results.")
logger.info(f"WizcliDirParser processed {len(findings)} findings.")
return findings

⚠️ Potential Denial of Service via Resource Exhaustion in dojo/tools/wizcli_img/parser.py
Vulnerability Potential Denial of Service via Resource Exhaustion
Description The get_findings method reads entire file contents without size limits, which could lead to memory exhaustion if a very large file is processed. Implementing file size limits and streaming parsing techniques would help prevent potential denial of service attacks.

import json
import logging
from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers # Adjust import path
logger = logging.getLogger(__name__)
class WizcliImgParser:
"""Wiz CLI Container Image Scan results in JSON file format."""
def get_scan_types(self):
# Use a distinct name for image scans
return ["Wizcli Img Scan"]
def get_label_for_scan_types(self, scan_type):
return "Wiz CLI Scan (Image)"
def get_description_for_scan_types(self, scan_type):
return "Parses Wiz CLI Container Image scan results in JSON format."
def get_findings(self, file, test):
try:
scan_data = file.read()
if isinstance(scan_data, bytes):
try:
scan_data = scan_data.decode("utf-8-sig")
except UnicodeDecodeError:
scan_data = scan_data.decode("utf-8")
data = json.loads(scan_data)
except json.JSONDecodeError as e:
msg = f"Invalid JSON format: {e}"
logger.error(msg)
raise ValueError(msg) from e
except Exception as e:
msg = f"Error processing report file: {e}"
logger.error(msg)
raise ValueError(msg) from e
findings = []
results_data = data.get("result", {})
if not results_data:
logger.warning("No 'result' key found in the Wiz report.")
return findings
# Parse OS Packages - Key difference for image scans
os_packages = results_data.get("osPackages")
if os_packages:
logger.debug(f"Parsing {len(os_packages)} OS package entries.")
findings.extend(WizcliParsers.parse_os_packages(os_packages, test))
else:
logger.debug("No 'osPackages' data found in results.")
# Parse Libraries (if present in image scans)
libraries = results_data.get("libraries")
if libraries:
logger.debug(f"Parsing {len(libraries)} library entries.")
findings.extend(WizcliParsers.parse_libraries(libraries, test))
else:
logger.debug("No 'libraries' data found in results.")
# Parse Secrets (if present in image scans)
secrets = results_data.get("secrets")
if secrets:
logger.debug(f"Parsing {len(secrets)} secret entries.")
findings.extend(WizcliParsers.parse_secrets(secrets, test))
else:
logger.debug("No 'secrets' data found in results.")
logger.info(f"WizcliImgParser processed {len(findings)} findings.")
return findings

⚠️ Hardcoded Service Account Key in unittests/scans/wizcli_img/wizcli_img_one_vul.json
Vulnerability Hardcoded Service Account Key
Description A GCP Service Account Key is present in the test JSON file. Even in test data, hardcoding service account details poses a security risk. Ensure that such sensitive credentials are never committed to version control, even in test fixtures, and use secure secret management practices.

{
"id": "8001d6bd-2b30-419d-8819-a3e962c90d42",
"projects": null,
"createdAt": "2025-05-07T13:46:45.864014091Z",
"startedAt": "2025-05-07T13:46:31.95780963Z",
"createdBy": {
"serviceAccount": {
"id": "hycyzczp25cxpbmp67mtt2cg4mcadi4doz2fey4y4bgrqmk5b2ugs"
}
},
"status": {
"state": "SUCCESS",
"verdict": "FAILED_BY_POLICY"
},
"policies": [
{
"id": "9bf73b16-99e7-4a54-af1e-dcfa1436a8f2",
"name": "test Default vulnerabilities policy ( Updated )",
"description": "Default built-in policy",
"type": "VULNERABILITIES",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI"
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamsvulnerabilities",
"severity": "HIGH",
"packageCountThreshold": 1,
"ignoreUnfixed": true,
"packageAllowList": [],
"detectionMethods": [
"PACKAGE",
"LIBRARY",
"FILE_PATH"
],
"vulnerabilities": [],
"fixGracePeriodHours": 0,
"publishGracePeriodHours": 0,
"ignoreTransitiveVulnerabilities": true
}
},
{
"id": "f3393997-29e9-4d15-b490-b91f575aebef",
"name": "Default malware policy",
"description": "Default built-in policy for malware scanning",
"type": "MALWARE",
"builtin": true,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "AUDIT",
"deploymentLifecycle": "CLI"
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamsmalware",
"malwareFindingSeverityThreshold": "HIGH",
"malwareFindingConfidenceLevelThreshold": "HIGH",
"countThreshold": 1
}
},
{
"id": "9c6726d0-1ada-4541-b6d6-3da5ca1124f9",
"name": "test Default vulnerabilities policy",
"description": "Default built-in policy",
"type": "VULNERABILITIES",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI"
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamsvulnerabilities",
"severity": "HIGH",
"packageCountThreshold": 1,
"ignoreUnfixed": true,
"packageAllowList": [],
"detectionMethods": [],
"vulnerabilities": [],
"fixGracePeriodHours": 0,
"publishGracePeriodHours": 0,
"ignoreTransitiveVulnerabilities": true
}
},
{
"id": "5a03dfb5-99ff-49b6-8a48-a9b65b13bf9a",
"name": "test Default secrets policy",
"description": "Default built-in policy for secret scanning",
"type": "SECRETS",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI"
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamssecrets",
"countThreshold": 1,
"pathAllowList": [
"/.git/config",
".git/config"
],
"secretFindingSeverityThreshold": "INFORMATIONAL"
}
},
{
"id": "978a1803-2e29-42c1-832a-ddfbb836c051",
"name": "test Default sensitive data policy",
"description": "Default built-in policy for sensitive data scanning",
"type": "SENSITIVE_DATA",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "AUDIT",
"deploymentLifecycle": "CLI"
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamssensitivedata",
"dataFindingSeverityThreshold": "",
"countThreshold": 0
}
}
],
"extraInfo": null,
"tags": null,
"outdatedPolicies": [],
"taggedResource": null,
"scanOriginResource": {
"__typename": "CICDScanOriginContainerImage",
"name": "registry.sss.com/test.ai/services/api/release-3-967-0:latest",
"id": null,
"digest": null,
"imageLabels": null
},
"result": {
"__typename": "CICDDiskScanResult",
"osPackages": null,
"libraries": null,
"applications": null,
"cpes": null,
"secrets": [
{
"id": "fcc00ecc-249b-5723-84fc-729aca5a5a67",
"externalId": null,
"description": "GCP Service Account Key ([email protected])",
"path": "/app/keys/gcp.json",
"lineNumber": 5,
"offset": 141,
"type": "CLOUD_KEY",
"contains": [
{
"name": "GCP Service Account Key ([email protected])",
"type": "CLOUD_KEY"
}
],
"snippet": null,
"failedPolicyMatches": [
{
"policy": {
"id": "5a03dfb5-99ff-49b6-8a48-a9b65b13bf9a",
"name": "test Default secrets policy",
"description": "Default built-in policy for secret scanning",
"type": "SECRETS",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI",
"enforcementConfig": null
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamssecrets",
"countThreshold": 1,
"pathAllowList": [
"/.git/config",
".git/config"
],
"secretFindingSeverityThreshold": "INFORMATIONAL"
}
},
"ignoreReason": null,
"matchedIgnoreRules": null
}
],
"hasAdminPrivileges": null,
"hasHighPrivileges": null,
"severity": "HIGH",
"relatedEntities": null,
"ignoredPolicyMatches": null,
"details": {
"__typename": "DiskScanSecretDetailsCloudKey",
"providerUniqueID": "[email protected]",
"keyType": 3,
"isLongTerm": true
}
}
],
"dataFindings": null,
"vulnerableSBOMArtifactsByNameVersion": null,
"hostConfiguration": {
"hostConfigurationFrameworks": null,
"hostConfigurationFindings": null,
"analytics": null
},
"failedPolicyMatches": [
{
"policy": {
"id": "9bf73b16-99e7-4a54-af1e-dcfa1436a8f2",
"name": "test Default vulnerabilities policy ( Updated )",
"description": "Default built-in policy",
"type": "VULNERABILITIES",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI",
"enforcementConfig": null
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamsvulnerabilities",
"severity": "HIGH",
"packageCountThreshold": 1,
"ignoreUnfixed": true,
"packageAllowList": [],
"detectionMethods": [
"PACKAGE",
"LIBRARY",
"FILE_PATH"
],
"vulnerabilities": [],
"fixGracePeriodHours": 0,
"publishGracePeriodHours": 0,
"ignoreTransitiveVulnerabilities": true
}
},
"ignoreReason": null,
"matchedIgnoreRules": null
},
{
"policy": {
"id": "9c6726d0-1ada-4541-b6d6-3da5ca1124f9",
"name": "test Default vulnerabilities policy",
"description": "Default built-in policy",
"type": "VULNERABILITIES",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI",
"enforcementConfig": null
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamsvulnerabilities",
"severity": "HIGH",
"packageCountThreshold": 1,
"ignoreUnfixed": true,
"packageAllowList": [],
"detectionMethods": [],
"vulnerabilities": [],
"fixGracePeriodHours": 0,
"publishGracePeriodHours": 0,
"ignoreTransitiveVulnerabilities": true
}
},
"ignoreReason": null,
"matchedIgnoreRules": null
},
{
"policy": {
"id": "5a03dfb5-99ff-49b6-8a48-a9b65b13bf9a",
"name": "test Default secrets policy",
"description": "Default built-in policy for secret scanning",
"type": "SECRETS",
"builtin": false,
"projects": null,
"policyLifecycleEnforcements": [
{
"enforcementMethod": "BLOCK",
"deploymentLifecycle": "CLI",
"enforcementConfig": null
}
],
"ignoreRules": null,
"lifecycleTargets": null,
"Default": false,
"params": {
"__typename": "cicdscanpolicyparamssecrets",
"countThreshold": 1,
"pathAllowList": [
"/.git/config",
".git/config"
],
"secretFindingSeverityThreshold": "INFORMATIONAL"
}
},
"ignoreReason": null,
"matchedIgnoreRules": null
}
],
"analytics": {
"vulnerabilities": {
"infoCount": 0,
"lowCount": 2,
"mediumCount": 14,
"highCount": 9,
"criticalCount": 3,
"unfixedCount": 2,
"totalCount": 28
},
"secrets": {
"privateKeyCount": 0,
"publicKeyCount": 0,
"passwordCount": 0,
"certificateCount": 0,
"cloudKeyCount": 1,
"sshAuthorizedKeyCount": 0,
"dbConnectionStringCount": 0,
"gitCredentialCount": 0,
"presignedURLCount": 0,
"saasAPIKeyCount": 0,
"infoCount": 0,
"lowCount": 0,
"mediumCount": 0,
"highCount": 0,
"criticalCount": 0,
"totalCount": 1
},
"hostConfiguration": null,
"malware": {
"infoCount": 0,
"lowCount": 0,
"mediumCount": 0,
"highCount": 0,
"criticalCount": 0,
"totalCount": 0
},
"softwareSupplyChain": null,
"filesScannedCount": 2666,
"directoriesScannedCount": 161
},
"sbomOutput": "",
"malwares": null,
"softwareSupplyChain": null
},
"reportUrl": "https://app.wiz.io/findings/cicd-scans#~%2528cicd_scan~%25278001d6bd-2b30-419d-8819-a3e962c90d42%252A2c2025-05-07T13%2525%25252A3a46%2525%25252A3a31.95780963Z%2527%2529"
}


All finding details can be found in the DryRun Security Dashboard.

@OsamaMahmood OsamaMahmood changed the base branch from master to bugfix May 14, 2025 13:17
@OsamaMahmood
Copy link
Contributor Author

OsamaMahmood commented May 15, 2025

✅ Test Scan Results – Parser Behavior & Deduplication

1. scan_img.json

  • Status: ✅ Working as expected
  • Details: Deduplication is functioning correctly. No duplicate findings are created on reimport.

2. scan_dir.json

  • Status: ✅ Working as expected
  • Details: Deduplication is functioning correctly. No duplicate findings are created on reimport.

3. scan_iac.json

  • Status:Not working as expected
  • Issue: When the same scan result is reimported, the system closes the existing finding and re-creates it as a new one, instead of recognizing it as a duplicate.
  • Expected Behavior: The system should retain the original finding and not create a duplicate on reimport as we are already setting unique id from tool for deduplication to work not sure whats happening .

Copy link
Member

@valentijnscholten valentijnscholten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @OsamaMahmood for your extensive PR. We do have some feedback:

  • Could you look at updating the tests/samples scans to reflect the updates to the parsers?
  • Could you look using the hash code configuration for deduplication?

I just raised #12463 to clarify the use of the unique_id_from_tool field. It's intended/accepted use is to contain value present in the report that can be used to recognize the finding inside the tool. And for strong and exact deduplication.
We will discuss internally if/how we can accomodate values computed by the parser that might be useful for deduplication.

@github-actions github-actions bot added the helm label Jun 1, 2025
@OsamaMahmood
Copy link
Contributor Author

Could you look using the hash code configuration for deduplication?

Hi @valentijnscholten i have updated the setting.dist.py to use Hash Code for de-duplication the results are same what i am getting by using unique_id_from_tool, findings are only getting de-duplicated in Wizcli Dir Scan and Wizcli Img Scan but for Wizcli IAC scan its still the same even if i upload the same report it closes some findings even though its the same report.

DefectDojo release bot and others added 2 commits June 9, 2025 14:35
@valentijnscholten valentijnscholten added this to the 2.47.3 milestone Jun 9, 2025
Copy link

dryrunsecurity bot commented Jun 12, 2025

DryRun Security

This pull request introduces potential information disclosure risks through detailed metadata and error logging in Wizcli parsers, which could expose internal system information if logs or interfaces are not properly secured.

Sensitive Scan Metadata Exposure in dojo/settings/settings.dist.py
Vulnerability Sensitive Scan Metadata Exposure
Description The changes to Wizcli parsers introduce detailed metadata fields like file paths, line numbers, and component details. While not an active exploit, these fields could expose internal system information if not properly handled. The risk is primarily in potential downstream exposure through application interfaces or logs.

"Red Hat Satellite": ["description", "severity"],
"Qualys Hacker Guardian Scan": ["title", "severity", "description"],
"Cyberwatch scan (Galeax)": ["title", "description", "severity"],
"Wizcli Img Scan": ["title", "file_path", "line", "component_name", "component_version"],
"Wizcli Dir Scan": ["title", "file_path", "line", "component_name", "component_version"],
"Wizcli IaC Scan": ["title", "file_path", "line", "component_name"],
}
# Override the hardcoded settings here via the env var

Potential Logging Information Disclosure in dojo/tools/wizcli_common_parsers/parsers.py
Vulnerability Potential Logging Information Disclosure
Description Error logging in the Wizcli parsers includes detailed exception messages that could reveal internal system details if logs are improperly secured. The logging includes file paths, component names, and parsing errors that should be carefully managed.

import logging
import re
from dojo.models import Finding
logger = logging.getLogger(__name__)
# Mapping from Wiz severities to DefectDojo severities
SEVERITY_MAPPING = {
"CRITICAL": "Critical",
"HIGH": "High",
"MEDIUM": "Medium",
"LOW": "Low",
"INFORMATIONAL": "Info",
"INFO": "Info",
"UNKNOWN": "Info", # Default for unknown severities
}
class WizcliParsers:
@staticmethod
def get_severity(severity_str):
"""Maps Wiz severity strings to DefectDojo standard TitleCase."""
if severity_str:
return SEVERITY_MAPPING.get(severity_str.upper(), "Info")
return "Info" # Default if severity is missing or None
@staticmethod
def extract_reference_link(text):
"""Extracts potential URL from remediation instructions."""
if not text:
return None
# Basic regex to find URLs, might need refinement
match = re.search(r"(https?://[^\s)]+)", text)
return match.group(1) if match else None
@staticmethod
def parse_libraries(libraries_data, test):
"""Parses library vulnerability data into granular DefectDojo findings."""
findings_list = []
if not libraries_data:
return findings_list
for lib_item in libraries_data:
lib_name = lib_item.get("name", "N/A")
lib_version = lib_item.get("version", "N/A")
lib_path = lib_item.get("path", "N/A")
lib_line = lib_item.get("startLine")
vulnerabilities_in_lib_instance = lib_item.get("vulnerabilities", [])
if not vulnerabilities_in_lib_instance:
continue
for vuln_data in vulnerabilities_in_lib_instance:
vuln_name = vuln_data.get("name", "N/A")
severity_str = vuln_data.get("severity")
severity = WizcliParsers.get_severity(severity_str)
fixed_version = vuln_data.get("fixedVersion", "N/A")
source_url = vuln_data.get("source", "N/A")
vuln_description_from_wiz = vuln_data.get("description")
score_str = vuln_data.get("score")
has_exploit = vuln_data.get("hasExploit", False)
has_cisa_kev_exploit = vuln_data.get("hasCisaKevExploit", False)
title = f"{lib_name} {lib_version} - {vuln_name}"
description_parts = [
f"**Vulnerability**: `{vuln_name}`",
f"**Severity**: {severity}",
f"**Library**: `{lib_name}`",
f"**Version**: `{lib_version}`",
f"**Path/Manifest**: `{lib_path}`",
]
if lib_line is not None:
description_parts.append(f"**Line in Manifest**: {lib_line}")
if fixed_version:
description_parts.append(f"**Fixed Version**: {fixed_version}")
mitigation = f"Update `{lib_name}` to version `{fixed_version}` or later in path/manifest `{lib_path}`."
else:
description_parts.append("**Fixed Version**: N/A")
mitigation = f"No fixed version available from Wiz. Investigate `{vuln_name}` for `{lib_name}` in `{lib_path}` and apply vendor guidance or risk acceptance."
description_parts.append(f"**Source**: {source_url}")
if vuln_description_from_wiz:
description_parts.append(f"\n**Details from Wiz**:\n{vuln_description_from_wiz}\n")
if score_str is not None:
description_parts.append(f"**CVSS Score (from Wiz)**: {score_str}")
description_parts.extend([
f"**Has Exploit (Known)**: {has_exploit}",
f"**In CISA KEV**: {has_cisa_kev_exploit}",
])
failed_policies = vuln_data.get("failedPolicyMatches", [])
if failed_policies:
description_parts.append("\n**Failed Policies**:")
for match in failed_policies:
policy = match.get("policy", {})
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})")
ignored_policies = vuln_data.get("ignoredPolicyMatches", [])
if ignored_policies:
description_parts.append("\n**Ignored Policies**:")
for match in ignored_policies:
policy = match.get("policy", {})
reason = match.get("ignoreReason", "N/A")
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')}), Reason: {reason}")
full_description = "\n".join(description_parts)
references = source_url if source_url != "N/A" else None
finding = Finding(
test=test,
title=title,
description=full_description,
severity=severity,
mitigation=mitigation,
file_path=lib_path,
line=lib_line if lib_line is not None else 0,
component_name=lib_name,
component_version=lib_version,
static_finding=True,
dynamic_finding=False,
vuln_id_from_tool=vuln_name,
references=references,
active=True, # Always set as active since we don't have status from Wiz
)
if score_str is not None:
try:
finding.cvssv3_score = float(score_str)
except (ValueError, TypeError):
logger.warning(f"Could not convert score '{score_str}' to float for finding '{title}'.")
if isinstance(vuln_name, str) and vuln_name.upper().startswith("CVE-"):
finding.cve = vuln_name
findings_list.append(finding)
return findings_list
@staticmethod
def parse_secrets(secrets_data, test):
"""Parses secret findings into granular DefectDojo findings."""
findings_list = []
if not secrets_data:
return findings_list
for secret in secrets_data:
secret_description = secret.get("description", "Secret detected")
secret_type = secret.get("type", "UNKNOWN_TYPE")
file_path = secret.get("path", "N/A")
line_number = secret.get("lineNumber")
severity_str = secret.get("severity")
severity = WizcliParsers.get_severity(severity_str)
title = f"Secret Detected: {secret_description} ({secret_type})"
description_parts = [
f"**Type**: `{secret_type}`",
f"**Description**: {secret_description}",
f"**File**: `{file_path}`",
]
if line_number is not None:
description_parts.append(f"**Line**: {line_number}")
details = secret.get("details", {})
detail_type = details.get("__typename")
if detail_type == "DiskScanSecretDetailsPassword":
description_parts.append("\n**Password Details**:")
if (pw_len := details.get("length")) is not None:
description_parts.append(f"- Length: {pw_len}")
if (is_complex := details.get("isComplex")) is not None:
description_parts.append(f"- Complex: {is_complex}")
elif detail_type == "DiskScanSecretDetailsCloudKey":
description_parts.append("\n**Cloud Key Details**:")
if (provider_id := details.get("providerUniqueID")):
description_parts.append(f"- Provider Unique ID: {provider_id}")
if (key_type_num := details.get("keyType")) is not None:
description_parts.append(f"- Key Type Code: {key_type_num}")
if (is_long_term := details.get("isLongTerm")) is not None:
description_parts.append(f"- Long Term Key: {is_long_term}")
failed_policies = secret.get("failedPolicyMatches", [])
if failed_policies:
description_parts.append("\n**Failed Policies**:")
for match in failed_policies:
policy = match.get("policy", {})
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})")
full_description = "\n".join(description_parts)
mitigation = "Rotate the exposed secret immediately. Remove the secret from the specified file path and line. Store secrets securely using a secrets management solution. Review commit history."
finding = Finding(
test=test,
title=title,
description=full_description,
severity=severity,
mitigation=mitigation,
file_path=file_path,
line=line_number if line_number is not None else 0,
static_finding=True,
dynamic_finding=False,
active=True, # Always set as active since we don't have status from Wiz
)
findings_list.append(finding)
return findings_list
@staticmethod
def parse_os_packages(os_packages_data, test):
"""Parses OS package vulnerabilities into granular DefectDojo findings."""
findings_list = []
if not os_packages_data:
return findings_list
for os_pkg in os_packages_data:
pkg_name = os_pkg.get("name", "N/A")
pkg_version = os_pkg.get("version", "N/A")
vulnerabilities = os_pkg.get("vulnerabilities", [])
if not vulnerabilities:
continue
for vuln_data in vulnerabilities:
vuln_name = vuln_data.get("name", "N/A")
severity_str = vuln_data.get("severity")
severity = WizcliParsers.get_severity(severity_str)
fixed_version = vuln_data.get("fixedVersion", "N/A")
source_url = vuln_data.get("source", "N/A")
vuln_description_from_wiz = vuln_data.get("description")
score_str = vuln_data.get("score")
has_exploit = vuln_data.get("hasExploit", False)
has_cisa_kev_exploit = vuln_data.get("hasCisaKevExploit", False)
title = f"OS Pkg: {pkg_name} {pkg_version} - {vuln_name}"
description_parts = [
f"**Vulnerability**: `{vuln_name}`",
f"**Severity**: {severity}",
f"**OS Package**: `{pkg_name}`",
f"**Version**: `{pkg_version}`",
]
if fixed_version:
description_parts.append(f"**Fixed Version**: {fixed_version}")
mitigation = f"Update OS package `{pkg_name}` to version `{fixed_version}` or later."
else:
description_parts.append("**Fixed Version**: N/A")
mitigation = f"Patch or update OS package `{pkg_name}` as per vendor advisory for `{vuln_name}`."
description_parts.append(f"**Source**: {source_url}")
if vuln_description_from_wiz:
description_parts.append(f"\n**Details from Wiz**:\n{vuln_description_from_wiz}\n")
if score_str is not None:
description_parts.append(f"**CVSS Score (from Wiz)**: {score_str}")
description_parts.extend([
f"**Has Exploit (Known)**: {has_exploit}",
f"**In CISA KEV**: {has_cisa_kev_exploit}",
])
failed_policies = vuln_data.get("failedPolicyMatches", [])
if failed_policies:
description_parts.append("\n**Failed Policies**:")
for match in failed_policies:
policy = match.get("policy", {})
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})")
ignored_policies = vuln_data.get("ignoredPolicyMatches", [])
if ignored_policies:
description_parts.append("\n**Ignored Policies**:")
for match in ignored_policies:
policy = match.get("policy", {})
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})")
full_description = "\n".join(description_parts)
references = source_url if source_url != "N/A" else None
finding = Finding(
test=test,
title=title,
description=full_description,
severity=severity,
mitigation=mitigation,
static_finding=True,
dynamic_finding=False,
component_name=pkg_name,
component_version=pkg_version,
vuln_id_from_tool=vuln_name,
references=references,
active=True, # Always set as active since we don't have status from Wiz
)
if score_str is not None:
try:
finding.cvssv3_score = float(score_str)
except (ValueError, TypeError):
logger.warning(f"Could not convert score '{score_str}' to float for finding '{title}'.")
if isinstance(vuln_name, str) and vuln_name.upper().startswith("CVE-"):
finding.cve = vuln_name
findings_list.append(finding)
return findings_list
@staticmethod
def parse_rule_matches(rule_matches_data, test):
"""
Parses IaC rule match data into granular DefectDojo findings.
Creates one finding per rule match instance on a specific resource.
"""
findings_list = []
if not rule_matches_data:
logger.debug("No ruleMatches data found to parse.")
return findings_list
for rule_match in rule_matches_data:
rule = rule_match.get("rule", {})
rule_id = rule.get("id", "N/A")
rule_name = rule.get("name", "Unnamed Rule")
# Use the severity from the ruleMatch level
severity_str = rule_match.get("severity")
severity = WizcliParsers.get_severity(severity_str)
matches = rule_match.get("matches", [])
if not matches:
continue
for match in matches:
resource_name = match.get("resourceName", "N/A")
file_name = match.get("fileName", "N/A")
line_number = match.get("lineNumber") # Can be None or int
match_content = match.get("matchContent", "N/A") # Code snippet
expected = match.get("expected", "N/A")
found = match.get("found", "N/A")
file_type = match.get("fileType", "IaC") # e.g., TERRAFORM, KUBERNETES
remediation = match.get("remediationInstructions") # Can be None
# Title: IaC: Rule Name - Resource Name (e.g., IaC: S3 Bucket Logging Disabled - my-bucket)
title = f"{rule_name} - {resource_name}"
# Description
description_parts = [
f"**Rule**: {rule_name} (ID: `{rule_id}`)",
f"**Severity**: {severity}",
f"**Resource**: `{resource_name}`",
f"**File**: `{file_name}`",
]
if line_number is not None:
description_parts.append(f"**Line**: {line_number}")
if match_content and match_content != "N/A":
description_parts.append(f"**Code Snippet**: ```\n{match_content}\n```") # Use markdown code block
description_parts.extend([
"\n**Finding Details**:",
f"- **Expected**: {expected}",
f"- **Found**: {found}",
f"- **File Type**: {file_type}",
])
# Use remediationInstructions as mitigation and potentially extract reference
mitigation = remediation or "Refer to Wiz rule details and vendor documentation."
references = WizcliParsers.extract_reference_link(remediation)
# Policy Information (from match level first, then rule level)
match_failed_policies = match.get("failedPolicies", [])
rule_failed_policies = rule_match.get("failedPolicyMatches", []) # Top level rule match policies
if match_failed_policies or rule_failed_policies:
description_parts.append("\n**Failed Policies**:")
processed_policy_ids = set()
for pol_match in match_failed_policies + rule_failed_policies:
policy = pol_match.get("policy", {})
pol_id = policy.get("id")
if pol_id and pol_id not in processed_policy_ids:
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {pol_id})")
processed_policy_ids.add(pol_id)
match_ignored_policies = match.get("ignoredPolicyMatches", [])
rule_ignored_policies = [] # Ignored policies seem to only be at the match level in the sample
if match_ignored_policies or rule_ignored_policies:
description_parts.append("\n**Ignored Policies**:")
processed_policy_ids = set()
for pol_match in match_ignored_policies + rule_ignored_policies:
policy = pol_match.get("policy", {})
pol_id = policy.get("id")
reason = pol_match.get("ignoreReason", "N/A")
if pol_id and pol_id not in processed_policy_ids:
description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {pol_id}), Reason: {reason}")
processed_policy_ids.add(pol_id)
full_description = "\n".join(description_parts)
finding = Finding(
test=test,
title=title,
description=full_description,
severity=severity,
mitigation=mitigation,
file_path=file_name,
line=line_number if line_number is not None else 0,
component_name=resource_name, # Use resource name as component
static_finding=True,
dynamic_finding=False,
vuln_id_from_tool=rule_id, # Use rule ID as the identifier
references=references,
active=True, # Always set as active since we don't have status from Wiz
)
findings_list.append(finding)
return findings_list
@staticmethod
def convert_status(wiz_status) -> dict:
"""Convert the Wiz Status to a dict of Finding status flags."""
if (status := wiz_status) is not None:
if status.upper() == "OPEN":
return {"active": True}


All finding details can be found in the DryRun Security Dashboard.

Copy link
Member

@valentijnscholten valentijnscholten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks two more things:

  • see my comment on the hash code fields
  • because the dedupe config has changed AND the title is not set differently, this needs some docs in the upgrade notes for 2.47.3.

Can you add instructions on how to recalculate the hash codes (see other releases to get a starting point). And line that states dedupe can mismatch between findings imported by the new parser versus the old parser (because of the change in values for the title field).

@OsamaMahmood
Copy link
Contributor Author

hi @valentijnscholten all changes done requesting for review

@valentijnscholten
Copy link
Member

@Maffooch @mtesauro Is there already a guideline/agreement on how to handle changes in parsers that affect deduplication? Sometimes recalculating the hash codes is enough. But in this case the title field is changing. Existing findings in Defect Dojo will have a different title and hash code as a result of that. There is note in the upgrade notes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs helm parser settings_changes Needs changes to settings.py based on changes in settings.dist.py included in this PR unittests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants